feat(spark): port Spark-compatible Parquet schema adapter from Comet#22341
Open
andygrove wants to merge 2 commits into
Open
feat(spark): port Spark-compatible Parquet schema adapter from Comet#22341andygrove wants to merge 2 commits into
andygrove wants to merge 2 commits into
Conversation
Adds a `parquet` feature to `datafusion-spark` exposing a `SparkPhysicalExprAdapterFactory` that mirrors Apache Spark's vectorized Parquet reader semantics. Faithful port of `apache/datafusion-comet`'s schema adapter on apache/main. Closes apache#22339 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Member
Author
|
@shehabgamin fyi |
- Short-circuit duplicate-field tree walk via Err propagation. - Cache `types_match` on `SparkCastColumnExpr` to skip per-batch DataType compare. - Drop redundant `Option<SchemaRef>` for `original_physical_schema` (gate on `case_sensitive` directly). - Pre-build case-folded HashSet of physical names for O(1) `is_missing` lookup in `replace_missing_with_defaults` (was O(d × n)). - Replace `O(n × m)` `eq_ignore_ascii_case` scan in `remap_physical_schema` with `HashSet<String>` of pre-lowercased names. - Drop micros→millis specialization in `SparkCastColumnExpr::evaluate`; arrow's cast handles it. - Dedupe `parse_field_id`/`field_id` between `schema_adapter.rs` and `parquet_support.rs`. - Drop unnecessary `array.data_type().clone()` in `parquet_convert_array`. - Reduce `remap_physical_schema` parameter list (3 bools → `&SparkParquetOptions`). - Extract `find_field` and `rename_field` helpers; collapse 3x case-sensitive lookup duplication and 2x field-rename block duplication. - Reduce test boilerplate: extract `execute_with_factory` helper used by all 3 odd-out tests; `temp_parquet_path()` returns `String` directly. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Rationale for this change
Issue #22339 asks for a Spark-compatible Parquet reader in the
datafusion-sparkcrate, based on functionality that already exists in apache/datafusion-comet. The schema adapter is the central piece of that reader — it rewrites physical expressions at planning time so that column references against the logical (query) schema resolve correctly to the physical (file) schema while preserving Spark's vectorized-reader semantics.This PR ports that schema adapter into
datafusion-sparkso DataFusion users can read Parquet files with Spark semantics by plugging aSparkPhysicalExprAdapterFactoryinto aFileScanConfig.What changes are included in this PR?
A new
parquetfeature ondatafusion-spark(off by default), exposing:SparkPhysicalExprAdapterFactory/SparkPhysicalExprAdapter— implementsPhysicalExprAdapterFactoryfor use viaFileScanConfigBuilder::with_expr_adapter.SparkParquetOptionswith all the version-sensitive flags (allow_type_promotion,return_null_struct_if_all_fields_missing,case_sensitive,use_field_id,ignore_missing_field_id, etc.) and anEvalModeenum.spark_parquet_convert— Spark-compatible nested struct/list/map adaptation, INT96 timezone reinterpret, FixedSizeBinary(16) → UUID rendering.SparkCastColumnExpr—PhysicalExprfor column-level type adaptation (timestamp micros → millis, nested field-name relabel, fallback tospark_parquet_convert).RejectOnNonEmpty— defers type-promotion rejection to runtime so empty Parquet files still pass (SPARK-26709).ParquetSchemaError— the four Parquet-relevant error variants from Comet'sSparkError(SchemaConvert,MissingFieldIds,DuplicateFieldByFieldId,DuplicateFieldCaseInsensitive).The full set of Spark vectorized-reader rejection rules is ported, including:
int → string, nobinary → decimalwithoutDecimalLogicalTypeAnnotation).isDecimalTypeMatched).canReadAsDecimal).Per-Spark-version behavior
Documented in
parquet/mod.rs. Configured viaSparkParquetOptionsflags:allow_type_promotion— Spark 3.x rejects INT32→INT64 / FLOAT→DOUBLE / INT32→DOUBLE; Spark 4.x allows.return_null_struct_if_all_fields_missing— flips at SPARK-53535 (Spark 4.1+).eval_mode— Spark 4.0 madeAnsithe default.Simplified vs Comet (deliberately)
CastExprrather than Comet's full Spark-compatibleCastPhysicalExpr. Spark's rejection rules still apply at the schema-adapter level; only the cast kernel itself is DataFusion's. Adding a Spark-specificCastPhysicalExprtodatafusion-sparkis a separate task.SparkErrorvariants are ported. Comet's fullSparkErrorenum exists for its JNI bridge to the Spark JVM, which is not relevant here.Are these changes tested?
Yes — 27 new tests, all passing (5 unit + 22 integration tests that round-trip Parquet through
DataSourceExec). The integration tests cover every rejection rule that has a corresponding test in Comet'sschema_adapter.rs:Total
cargo test -p datafusion-spark --features "core parquet" --lib: 253 passed, 0 failed.Are there any user-facing changes?
Yes — new public API behind the
parquetfeature ondatafusion-spark:No changes to existing APIs; the new module is gated behind the optional
parquetfeature.🤖 Generated with Claude Code